package com.tang.common.utils;

import cn.hutool.extra.spring.SpringUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializeFilter;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.tang.common.constant.ParamName;
import com.tang.common.utils.cache.ParamCache;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.cache.clear.ClearIndicesCacheRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.client.indices.PutMappingRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
 * ES工具类
 * @author tang
 * @date 2021/11/2 16:06
 */
@Slf4j
public class ElasticSearchUtils implements Serializable{

    private  final static RestHighLevelClient restHighLevelClient;

    static {
        restHighLevelClient = SpringUtil.getBean(RestHighLevelClient.class);
    }


    /**
     * 别名操作
     */
    public static class Alias implements Serializable {

        /**
         * 将一个索引的别名删除，同时将该别名添加到新的索引上
         * @param addIndexName 需要添加别名的索引
         * @param removeIndexName 需要删除别名的索引
         * @param aliasName 别名名称
         */
        public static boolean reindex(String addIndexName,String removeIndexName,String aliasName) throws IOException {
            IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest();
            indicesAliasesRequest.addAliasAction(IndicesAliasesRequest.AliasActions.add().index(addIndexName).alias(aliasName));
            indicesAliasesRequest.addAliasAction(IndicesAliasesRequest.AliasActions.remove().index(removeIndexName).alias(aliasName));
            AcknowledgedResponse acknowledgedResponse = restHighLevelClient.indices().updateAliases(indicesAliasesRequest, RequestOptions.DEFAULT);
            return acknowledgedResponse.isAcknowledged();
        }

        /**
         * 添加别名
         * @param indexName 索引
         * @param aliasName 别名名称
         */
        public static boolean addAlias(String indexName,String aliasName) throws IOException {
            IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest();
            indicesAliasesRequest.addAliasAction(IndicesAliasesRequest.AliasActions.add().index(indexName).alias(aliasName));
            AcknowledgedResponse acknowledgedResponse = restHighLevelClient.indices().updateAliases(indicesAliasesRequest, RequestOptions.DEFAULT);
            return acknowledgedResponse.isAcknowledged();
        }

        /**
         * 删除别名
         * @param indexName 索引
         * @param aliasName 别名名称
         */
        public static boolean removeAlias(String indexName,String aliasName) throws IOException {
            IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest();
            indicesAliasesRequest.addAliasAction(IndicesAliasesRequest.AliasActions.remove().index(indexName).alias(aliasName));
            AcknowledgedResponse acknowledgedResponse = restHighLevelClient.indices().updateAliases(indicesAliasesRequest, RequestOptions.DEFAULT);
            return acknowledgedResponse.isAcknowledged();
        }


    }

    /**
     * 聚合操作
     */
    public static class Polymerization implements Serializable{
        /**
         * 聚合查询出最大值
         * @param indexName 索引名
         * @param aggregateName 聚合名
         * @param aggregateField 需要求最大值的字段名
         */
        public static SearchResponse max(String indexName,String aggregateName,String aggregateField) throws IOException {
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            searchSourceBuilder.aggregation(AggregationBuilders.max(aggregateName).field(aggregateField));
            searchSourceBuilder.size(0);
            return search(indexName, searchSourceBuilder);
        }

        /**
         * 聚合查询出最小值
         * @param indexName 索引名
         * @param aggregateName 聚合名
         * @param aggregateField 需要求最小值的字段名
         */
        public static SearchResponse min(String indexName,String aggregateName,String aggregateField) throws IOException {
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            searchSourceBuilder.aggregation(AggregationBuilders.min(aggregateName).field(aggregateField));
            searchSourceBuilder.size(0);
            return search(indexName, searchSourceBuilder);
        }

        /**
         * 聚合查询出最平均值
         * @param indexName 索引名
         * @param aggregateName 聚合名
         * @param aggregateField 需要求最小值的字段名
         */
        public static SearchResponse avg(String indexName,String aggregateName,String aggregateField) throws IOException {
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            searchSourceBuilder.aggregation(AggregationBuilders.avg(aggregateName).field(aggregateField));
            searchSourceBuilder.size(0);
            return search(indexName, searchSourceBuilder);
        }
    }

    /**
     * 新增操作
     */
    public static class Insert implements Serializable{
        /**
         * 带ID的批量新增
         * @param jsonData json数据
         * @param indexName 索引名称
         * @return
         */
        public static IndexResponse add(String indexName, String id, String jsonData) throws IOException {
            IndexRequest indexRequest = new IndexRequest().index(indexName).id(id).source(jsonData, XContentType.JSON);
            return restHighLevelClient.index(indexRequest,RequestOptions.DEFAULT);
        }

        /**
         * 不带ID的批量新增
         * @param jsonData json数据
         * @param indexName 索引名称
         * @return
         */
        public static IndexResponse add(String indexName, String jsonData) throws IOException {
            IndexRequest indexRequest = new IndexRequest().index(indexName).source(jsonData, XContentType.JSON);
            return restHighLevelClient.index(indexRequest,RequestOptions.DEFAULT);
        }

        /**
         * 带ID的批量新增
         * @param indexName 索引名称
         * @param mapData 数据map
         */
        public static BulkResponse batchAdd(String indexName,Map<String, Object> mapData) throws IOException {
            BulkRequest bulkRequest = new BulkRequest();
            for (String id : mapData.keySet()) {
                bulkRequest.add(new IndexRequest().index(indexName).id(id).source(JSON.toJSONString(mapData.get(id)),XContentType.JSON));
            }
            return restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
        }

        /**
         * 带ID的批量新增,直接存储JSON数据
         * @param indexName 索引名称
         * @param mapData 数据map
         */
        public static BulkResponse batchAddJson(String indexName,Map<String, String> mapData) throws IOException {
            BulkRequest bulkRequest = new BulkRequest();
            for (String id : mapData.keySet()) {
                bulkRequest.add(new IndexRequest().index(indexName).id(id).source(mapData.get(id),XContentType.JSON));
            }
            return restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
        }

        /**
         * 不带ID的批量新增
         * @param indexName 索引名称
         * @param listData 数据map
         */
        public static BulkResponse batchAddNoId(String indexName,List<? extends Object> listData) throws IOException {
            BulkRequest bulkRequest = new BulkRequest();
            for (Object data : listData) {
                bulkRequest.add(new IndexRequest().index(indexName).source(JSON.toJSONString(data),XContentType.JSON));
            }
            return restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
        }

        /**
         * 不带ID的批量新增,直接存储JSON数据
         * @param indexName 索引名称
         * @param listData 数据map
         */
        public static BulkResponse batchAddNoIdJson(String indexName,List<String> listData) throws IOException {
            BulkRequest bulkRequest = new BulkRequest();
            for (Object data : listData) {
                bulkRequest.add(new IndexRequest().index(indexName).source(data,XContentType.JSON));
            }
            return restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
        }

        /**
         * 不带ID的批量新增,指定JSON化方式
         * @param indexName 索引名称
         * @param listData 数据
         *@param serializeFilters 序列化方式
         */
        public static BulkResponse batchAddNoId(String indexName, List<? extends Object> listData, SerializeFilter... serializeFilters) throws IOException {
            BulkRequest bulkRequest = new BulkRequest();
            for (Object data : listData) {
                bulkRequest.add(new IndexRequest().index(indexName).source(JSON.toJSONString(data, serializeFilters),XContentType.JSON));
            }
            return restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
        }

        /**
         * 带ID的批量新增,指定JSON化方式
         * @param indexName 索引名称
         * @param mapData 数据
         *@param serializeFilters 序列化方式
         */
        public static BulkResponse batchAddNoId(String indexName, Map<String, Object> mapData, SerializeFilter... serializeFilters) throws IOException {
            BulkRequest bulkRequest = new BulkRequest();
            for (String id : mapData.keySet()) {
                bulkRequest.add(new IndexRequest().index(indexName).id(id).source(JSON.toJSONString(mapData.get(id),serializeFilters),XContentType.JSON));
            }
            return restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
        }
    }

    /**
     * 删除操作
     */
    public static class Delete implements Serializable{
        /**
         * 根据ID批量删除
         * @param indexName 索引名称
         * @param idList id集合
         */
        public static BulkResponse batchDelete(String indexName,List<String> idList) throws IOException {
            BulkRequest bulkRequest = new BulkRequest();
            for (String id : idList) {
                bulkRequest.add(new DeleteRequest().index(indexName).id(id));
            }
            return restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
        }


        /**
         * 根据查询条件删除文档
         * @param queryBuilder 搜索请求
         * @param indexName 索引名称
         */
        public static BulkByScrollResponse batchDelete(String indexName, QueryBuilder queryBuilder) throws IOException {
            DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indexName);
            deleteByQueryRequest.setQuery(queryBuilder);
            return restHighLevelClient.deleteByQuery(deleteByQueryRequest,RequestOptions.DEFAULT);
        }
    }

    /**
     * 创建索引
     * @param indexName 索引名称
     * @param setting 索引设置
     * @param mapping 索引映射
     * @return 是否成功
     * @throws IOException
     */
    public static boolean createIndex(String indexName, Settings setting, XContentBuilder mapping) throws IOException {
        CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
        //索引设置
        if (setting != null){
            createIndexRequest.settings(setting);
        }
        //索引映射
        if (mapping != null){
            createIndexRequest.mapping(mapping);
        }
        return restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT).isAcknowledged();
    }

    /**
     * 使用默认分片()副本来创建索引
     * @param indexName 索引名称
     * @return 是否成功
     * @throws IOException
     */
    public static boolean createIndex(String indexName) throws IOException {
        Settings settings = Settings.builder()
                //分片
                .put("index.number_of_shards", ParamCache.getValue(ParamName.DEFAULT_SHARDS_NUMBER))
                //副本
                .put("index.number_of_replicas", ParamCache.getValue(ParamName.DEFAULT_REPLICAS_NUMBER))
                .build();
        return createIndex(indexName,settings,null);
    }


    /**
     * 创建索引,指定分配和副本数量
     * @param indexName 索引名称
     * @return 是否成功
     * @throws IOException
     */
    public static boolean createIndex(String indexName,Integer shards,Integer replicas) throws IOException {
        Settings settings = Settings.builder()
                //分片
                .put("index.number_of_shards", shards)
                //副本
                .put("index.number_of_replicas", replicas)
                .build();
        return createIndex(indexName,settings,null);
    }

    /**
     * 使用默认分片副本来创建索引，并指定是否开启自动时间检测
     * @param indexName 索引名称
     * @param automaticTime 自动时间检测
     * @return 是否成功
     * @throws IOException
     */
    public static boolean createIndex(String indexName,Boolean automaticTime) throws IOException {
        Settings settings = Settings.builder()
                //分片
                .put("index.number_of_shards", ParamCache.getValue(ParamName.DEFAULT_SHARDS_NUMBER))
                //副本
                .put("index.number_of_replicas", ParamCache.getValue(ParamName.DEFAULT_REPLICAS_NUMBER))
                .build();
        //是否开启自动时间检测
        XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject().field("date_detection", automaticTime).endObject();
        return createIndex(indexName,settings,xContentBuilder);
    }

    /**
     * 查询索引设置 setting
     * @param indexName                 索引名
     * @return
     */
    public static Settings getIndexSettings(String indexName) throws IOException {
        GetSettingsRequest request = new GetSettingsRequest().indices(indexName);
        return restHighLevelClient.indices().getSettings(request, RequestOptions.DEFAULT).getIndexToSettings().get(indexName);
    }


    /**
     * 更新 setting
     * @param indexName                 索引名
     * @param setting                   设置
     * @return
     */
    public static boolean updateIndexSettings(String indexName, Settings setting) throws IOException {
        UpdateSettingsRequest request = new UpdateSettingsRequest(indexName);
        request.settings(setting);
        return restHighLevelClient.indices().putSettings(request, RequestOptions.DEFAULT).isAcknowledged();
    }
    /**
     * 删除索引
     * @param indexName                 索引名
     * @return 是否成功
     */
    public static boolean deleteIndex(String indexName) throws IOException {
        DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
        AcknowledgedResponse delete = restHighLevelClient.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
        return delete.isAcknowledged();
    }

    /**
     * 索引是否存在
     * @param indexName                 索引名
     * @return
     */
    public static boolean indexExists(String indexName) throws IOException {
        GetIndexRequest getIndexRequest = new GetIndexRequest(indexName);
        return restHighLevelClient.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
    }



    /**
     * 更新映射
     * @param indexName                 索引名
     * @param mapping					映射
     * @return
     */
    public static boolean createIndexMapping(String indexName, XContentBuilder mapping) throws IOException {
        PutMappingRequest putMappingRequest = new PutMappingRequest(indexName);
        putMappingRequest.source(mapping);
        AcknowledgedResponse putMappingResponse = restHighLevelClient.indices().putMapping(putMappingRequest, RequestOptions.DEFAULT);
        System.out.println(putMappingResponse);
        return putMappingResponse.isAcknowledged();
    }

    /**
     * 新增或更新文档
     * @param indexName                 索引名
     * @param targetVo                  date
     * @return
     */
    public static boolean updateDoc(String indexName, Object targetVo, String id) throws IOException {
        GetRequest getRequest = new GetRequest().index(indexName).id(id);
        boolean exists = restHighLevelClient.exists(getRequest, RequestOptions.DEFAULT);
        if(exists){
            // 存在更新
            UpdateRequest updateRequest = new UpdateRequest().index(indexName).id(id).doc(JSON.toJSONString(targetVo), XContentType.JSON);
            UpdateResponse update = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
            return RestStatus.OK.equals(update.status());
        }else{
            // 不存在则添加
            IndexRequest indexRequest = new IndexRequest().index(indexName).id(id).source(JSON.toJSONString(targetVo), XContentType.JSON);
            IndexResponse index = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
            return RestStatus.OK.equals(index.status());
        }
    }

    /**
     * 清理缓存
     */
    public static void clearCache() throws IOException {
        ClearIndicesCacheRequest requestAll = new ClearIndicesCacheRequest();
        requestAll.indicesOptions(IndicesOptions.lenientExpandOpen());
        restHighLevelClient.indices().clearCache(requestAll, RequestOptions.DEFAULT);
    }

    /**
     * 搜索
     * @param indexName                 索引名
     * @param searchSourceBuilder       条件
     * @return 搜索结果集合
     */
    public static SearchResponse search(String indexName, SearchSourceBuilder searchSourceBuilder) throws IOException {
        SearchRequest searchRequest = new SearchRequest();
        searchRequest.indices(indexName);
        log.info("ES执行的查询索引{},语句:{}",indexName,searchSourceBuilder);
        searchRequest.source(searchSourceBuilder);
        return restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
    }

    /**
     * 搜索全部数据
     * @param indexName                 索引名
     * @return 搜索结果集合
     */
    public static SearchResponse searchAll(String indexName) throws IOException {
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(QueryBuilders.matchAllQuery());
        SearchRequest searchRequest = new SearchRequest();
        searchRequest.indices(indexName);
        searchRequest.source(searchSourceBuilder);
        return restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
    }

    /**
     * 按照ID搜索
     * @param indexName                 索引名
     * @param id                        条件
     * @return 数据
     */
    public static String selectById(String indexName, String id) throws IOException {
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(QueryBuilders.termQuery("_id",id));
        SearchResponse search = search(indexName, searchSourceBuilder);
        if (search == null){
            return null;
        }
        return search.getHits().getHits().length == 1 ? search.getHits().getHits()[0].getSourceAsString() : null;
    }

    /**
     * 自定义语句分页搜索
     * 如输入{"match_all": {}}:查询所有
     * @param indexName                 索引名
     * @param esSql                     es查询语句
     * @return 数据
     */
    public static Page<Object> selectByEsSql(String indexName, String esSql, Integer pageNumber, Integer pageSize) throws IOException {
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        //分页
        searchSourceBuilder.from((pageNumber - 1) * pageSize);
        searchSourceBuilder.size(pageSize);
        //执行的语句
        searchSourceBuilder.query(QueryBuilders.wrapperQuery(esSql));
        //执行查询
        SearchResponse search = search(indexName, searchSourceBuilder);
        //构建分页对象
        Page<Object> page = new Page<>(pageNumber,pageSize,search.getHits().getTotalHits().value);
        //直接获取结果，如果需要获取版本号等其他信息可以修改
        page.setRecords(Arrays.stream(search.getHits().getHits()).map(SearchHit::getSourceAsMap).collect(Collectors.toList()));
        return page;
    }

    /**
     * 按照ID删除
     * @param indexName                 索引名
     * @param id                        id
     * @return 数据
     */
    public static int delete(String indexName, String id) throws IOException {
        DeleteRequest deleteRequest = new DeleteRequest();
        deleteRequest.index(indexName).id(id);
        DeleteResponse delete = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
        return delete.status().getStatus();
    }

    /**
     * 批量操作
     * @param bulkRequest 批量操作
     */
    public static BulkResponse batch(BulkRequest bulkRequest) throws IOException {
        return restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
    }

}
